17  格式与逻辑错误处理 股指数据清洗

17.1 引言数据错误的普遍性

在金融数据分析中,除了缺失值外,错误数据(Error Data)同样普遍且危害更大。错误数据分为两类:

格式错误: - 日期格式混乱: “2024-01-01”与”2024/01/01”混用 - 数值类型错误: 价格存储为字符串(“100.5”而非100.5) - 编码问题: 中文字符乱码 - 单位不一致: 价格单位有”元”和”万元”

逻辑错误: - 负价格: 股票价格不可能为负 - 异常高价: PE超过1000可能数据错误 - 成交量异常: 流通股本小于成交量 - 违反约束: 价格涨停但跌幅10%

理论背景:数据质量维度

数据质量管理理论将数据质量分为六个维度(Wang & Strong, 1996):

  1. 准确性(Accuracy): 数据正确反映现实
  2. 完整性(Completeness): 无缺失值
  3. 一致性(Consistency): 数据间无矛盾
  4. 时效性(Timeliness): 数据及时更新
  5. 有效性(Validity): 符合业务规则
  6. 唯一性(Uniqueness): 无重复记录

逻辑错误直接破坏了准确性和有效性,可能导致严重的投资决策失误。

17.2 格式错误处理

17.2.1 日期格式统一

平台任务解答代码

以下代码与教学平台任务要求完全一致:

列表 17.1
# ⚠️ 平台原始代码 - 请原样输入至教学平台(注释除外),平台才会判定答案正确
import pandas as pd  # 导入Pandas数据分析库
import numpy as np  # 导入NumPy数值计算库
# 从Excel文件读取数据存入df
df=pd.read_excel('https://huoran.oss-cn-shenzhen.aliyuncs.com/20230313/xls/1635115220651761664.xls',sheet_name='Sheet1')
df1=df.drop([247])  # 删除指定行或列
df1.dropna(how='all',inplace=True)  # 删除全部为空值的行
# 修改日期并转换数据类型
df1.iloc[201,0]='2022-11-04'
df1['日期']=pd.to_datetime(df1['日期'])  # 转换为日期时间格式
# 修改指定值
df1.loc[138,'科创50']=np.nan
# 填充缺失值
df1.fillna(method='ffill',inplace=True)
print(df1)  # 输出数据框数据
列表 17.2
# =============================================================================
# 题目:处理日期格式不一致问题
# =============================================================================
# 本任务演示如何处理不同格式的日期字符串,统一转换为Pandas的Timestamp对象
# 场景:数据来源于多个系统,日期格式不统一

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库
import numpy as np  # NumPy数值计算库

# ==================== 创建含多种日期格式的数据 ====================
data = {
    # Date列包含5种不同的日期格式
    'Date': ['2024-01-01',  # 格式1:YYYY-MM-DD(ISO标准)
             '2024/01/02',   # 格式2:YYYY/MM/DD(斜杠分隔)
             '01-03-2024',   # 格式3:DD-MM-YYYY(欧洲格式)
             '2024.01.04',   # 格式4:YYYY.MM.DD(点号分隔)
             '20240105'],    # 格式5:YYYYMMDD(紧凑格式)
    # 价格和成交量数据
    'Price': [100.5, 101.2, 99.8, 102.3, 101.5],
    'Volume': [10000, 15000, 12000, 18000, 14000]
}
# 转换为数据框
df_dates = pd.DataFrame(data)

# ==================== 显示原始数据 ====================
print('原始数据(日期格式混乱):')
print(df_dates)
print('\n数据类型:')
# .dtypes:显示每列的数据类型
# Date列是object(字符串),需要转换为datetime
print(df_dates.dtypes)
print()

# ==================== 方法1:pd.to_datetime自动解析 ====================
# pd.to_datetime():将字符串转换为日期时间对象
# errors='coerce':遇到无法解析的格式时,转换为NaT(Not a Time)
# 这是最安全的方式,不会抛出异常
df_dates['Date_Parsed'] = pd.to_datetime(df_dates['Date'], errors='coerce')
print('自动解析后:')
print(df_dates[['Date', 'Date_Parsed']])
# 输出解读:Pandas会自动推断日期格式
# - '2024-01-01' → 2024-01-01(标准格式)
# - '2024/01/02' → 2024-01-02(自动识别)
# - '01-03-2024' → NaT(无法确定是01-03还是03-01)
# - '2024.01.04' → 2024-01-04(自动识别)
# - '20240105' → 2024-01-05(自动识别紧凑格式)
print('\n解析失败(转换为NaT):')
# df_dates[df_dates['Date_Parsed'].isna()]:选择解析失败的行
print(df_dates[df_dates['Date_Parsed'].isna()][['Date']])
print()

# ==================== 方法2:指定format='mixed' ====================
# format='mixed':允许混合多种格式,尝试自动识别
# 注意:这个参数在Pandas较新版本中可用
df_dates['Date_Format2'] = pd.to_datetime(df_dates['Date'], format='mixed', errors='coerce')
print('使用format="mixed"参数:')
print(df_dates[['Date', 'Date_Format2']])
# 混合格式可以处理更多情况,但性能可能下降
print()

# ==================== 方法3:处理特殊格式 ====================
# 对于无法自动解析的格式,需要明确指定格式字符串
# '20240105' 格式:'%Y%m%d'(YYYYMMDD)
# %Y:四位年份,%m:两位月份,%d:两位日期
special_dates = pd.to_datetime('20240105', format='%Y%m%d')
print(f'特殊格式解析: {special_dates}')
# 更多格式代码:
# %y:两位年份(24)
# %b:月份缩写(Jan)
# %B:月份全称(January)
# %a:星期缩写(Mon)
# %A:星期全称(Monday)

代码深度解析:

  1. errors='coerce'参数:
    • 'raise'(默认): 遇到无法解析的值抛出异常
    • 'coerce': 无法解析转为NaT(Not a Time)
    • 'ignore': 保留原值,不转换
  2. format参数:
    • 'mixed': 自动检测多种格式
    • '%Y-%m-%d': 明确指定格式,提高效率
    • 混合格式下,性能可能下降
  3. NaT处理:
    • NaT是时间序列的NaN
    • 检测: pd.isna()pd.isnull()
    • 处理: 删除或手动修正

17.2.2 数值类型转换

列表 17.3
# =============================================================================
# 题目:将文本型数值转换为数值类型
# =============================================================================
# 本任务演示如何将存储为文本的数值数据转换为正确的数值类型
# 场景:从Excel或CSV导入的数据,数值列被识别为文本

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库

# ==================== 创建含数值格式错误的数据 ====================
data = {
    # 股票代码列
    'Code': ['600001', '600002', '600003', '600004'],
    # 价格列:包含错误值'error'(直接转换会报错)
    'Price': ['100.5', '105.2', 'error', '98.7'],
    # PE列:包含'N/A'(不可用的标准文本)
    'PE': ['15.3', 'N/A', '18.5', '22.1'],
    # 成交量列:包含千分位逗号('10,000')
    'Volume': ['10000', '15000', '12,000', '18,000']
}
df_numeric = pd.DataFrame(data)

# ==================== 显示原始数据 ====================
print('原始数据(数值为文本):')
print(df_numeric)
print('\n数据类型:')
print(df_numeric.dtypes)
# Price、PE、Volume都是object(字符串)类型
# 无法进行数值计算(如求和、求均值)
print()

# ==================== 错误的转换方式(会报错) ====================
# df_numeric['Price'] = df_numeric['Price'].astype(float)
# 上面这行会抛出ValueError,因为'error'无法转换为float
print('注意:直接使用astype()转换含错误值的列会报错\n')

# ==================== 正确方式:pd.to_numeric + errors='coerce' ====================
# pd.to_numeric():智能转换为数值类型
# errors='coerce':无法转换的值变为NaN(而不是报错)
df_numeric['Price_Num'] = pd.to_numeric(df_numeric['Price'], errors='coerce')
df_numeric['PE_Num'] = pd.to_numeric(df_numeric['PE'], errors='coerce')

print('转换后(无法转换的变为NaN):')
print(df_numeric)
print('\n转换后数据类型:')
print(df_numeric[['Price_Num', 'PE_Num']].dtypes)
# Price_Num和PE_Num现在是float64类型
print('\n无法转换的记录:')
# 选择Price_Num为NaN的行
print(df_numeric[df_numeric['Price_Num'].isna()][['Code', 'Price']])
print()

# ==================== 处理千分位逗号 ====================
# '12,000'包含逗号,无法直接转换
# .str.replace(',', ''):将逗号替换为空字符串
# 注意:必须先确保列是字符串类型
df_numeric['Volume_Cleaned'] = df_numeric['Volume'].str.replace(',', '')
# 移除逗号后,再转换为数值
df_numeric['Volume_Num'] = pd.to_numeric(df_numeric['Volume_Cleaned'], errors='coerce')

print('处理千分位后:')
print(df_numeric[['Volume', 'Volume_Num']])
# 输出解读:
# - '10000' → 10000.0
# - '15000' → 15000.0
# - '12,000' → 12000.0(逗号被移除)
# - '18,000' → 18000.0(逗号被移除)

补充说明:类型转换策略

  1. 优先使用pd.to_numeric()而非astype():

    • 更安全:处理异常值不报错
    • 更灵活:可以指定错误处理方式
  2. 千分位处理:

    • 千分位逗号导致转换失败
    • 解决:先用str.replace()移除逗号
    • 注意:确保所有值都是字符串类型
  3. 货币符号处理:

    # "¥100.5" 或 "$100.5"
    df['Price'] = df['Price'].str.replace('¥', '').str.replace('$', '')
    df['Price'] = pd.to_numeric(df['Price'], errors='coerce')

金融应用:A股数据常遇到以下格式问题: - 价格含货币符号 - 成交量含”万”、“亿”单位 - 百分比存储为”12.5%“文本 - 日期格式不统一

17.2.3 综合格式清洗流程

列表 17.4
# =============================================================================
# 题目:金融数据的综合格式清洗
# =============================================================================
# 本任务演示一个完整的数据清洗流程,处理多种格式错误
# 场景:从多个数据源整合的股指数据,格式混乱

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库
import numpy as np  # NumPy数值计算库

# ==================== 创建模拟的"脏"数据 ====================
data_dirty = {
    # 日期列:包含多种格式和无效值
    'Date': ['2024-01-01', '2024/01/02', '20240103', '2024-01-04', 'invalid'],
    # 股票代码列
    'Code': ['600001.SH', '600002.SH', '000001.SZ', '600004.SH', '600005.SH'],
    # 开盘价列:含货币符号和错误值
    'Open': ['¥10.5', '11.2', 'error', '12.5', '10.8'],
    # 收盘价列:含货币符号和百分号
    'Close': ['10.8', '¥11.5', '12.3', '12.7', '10.5%'],
    # 成交量列:含千分位和单位
    'Volume': ['10000', '15,000', '12,000股', '18000', '20万'],
    # PE列:含特殊符号
    'PE': ['15.3', 'N/A', '18.5', '>100', '22.1']
}
df_dirty = pd.DataFrame(data_dirty)

# ==================== 显示原始数据 ====================
print('=== 原始数据(含多种格式错误) ===')
print(df_dirty)
print()

# ==================== 清洗流程 ====================
# .copy():创建副本,避免修改原始数据
df_clean = df_dirty.copy()

# ==================== 步骤1:日期格式统一 ====================
# pd.to_datetime(..., errors='coerce'):转换日期,无法解析的变为NaT
df_clean['Date'] = pd.to_datetime(df_clean['Date'], errors='coerce')
print('步骤1: 日期格式统一')
# .isna().sum():统计转换失败的记录数
print(f'解析失败: {df_clean["Date"].isna().sum()} 条')
print()

# ==================== 步骤2:移除货币符号并转换为数值 ====================
# for循环:遍历Open和Close两列
for col in ['Open', 'Close']:
    # .astype(str):确保是字符串类型(防止非字符串列报错)
    # .str.replace('¥', ''):移除人民币符号
    # .str.replace('$', ''):移除美元符号
    df_clean[col] = df_clean[col].astype(str).str.replace('¥', '').str.replace('$', '')
    # .str.replace('%', ''):移除百分号
    df_clean[col] = df_clean[col].str.replace('%', '')
    # pd.to_numeric(..., errors='coerce'):转换为数值,无法转换的变为NaN
    df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')

print('步骤2: 价格列清洗')
print(df_clean[['Open', 'Close']].head())
print()

# ==================== 步骤3:成交量清洗 ====================
# 定义自定义函数,处理复杂的成交量格式
def clean_volume(vol_str):
    """
    清洗成交量数据

    参数:
        vol_str:成交量字符串(可能包含单位、符号)

    返回:
        float:清洗后的数值
    """
    # 检查是否为NaN
    if pd.isna(vol_str):
        return np.nan
    # 转换为字符串并去除首尾空格
    vol_str = str(vol_str).strip()

    # 处理"万"单位(乘以10000)
    if '万' in vol_str:
        # .replace('万', ''):移除"万"字
        # .replace(',', ''):移除千分位逗号
        # float(...):转换为浮点数
        # * 10000:乘以一万
        return float(vol_str.replace('万', '').replace(',', '')) * 10000
    # 处理"亿"单位(乘以100000000)
    elif '亿' in vol_str:
        return float(vol_str.replace('亿', '').replace(',', '')) * 100000000
    # 处理"股"单位
    elif '股' in vol_str:
        return float(vol_str.replace('股', '').replace(',', ''))
    # 其他情况:只移除逗号
    else:
        return float(vol_str.replace(',', ''))

# .apply(clean_volume):对每个元素应用clean_volume函数
df_clean['Volume_Num'] = df_clean['Volume'].apply(clean_volume)

print('步骤3: 成交量清洗')
print(df_clean[['Volume', 'Volume_Num']])
print()

# ==================== 步骤4:PE比率清洗 ====================
# 移除">"符号(如">100"表示PE超过100)
df_clean['PE'] = df_clean['PE'].astype(str).str.replace('>', '')
# 转换为数值
df_clean['PE'] = pd.to_numeric(df_clean['PE'], errors='coerce')

print('步骤4: PE比率清洗')
print(df_clean[['PE']].head())
print()

# ==================== 最终验证 ====================
print('=== 清洗后数据类型 ===')
print(df_clean.dtypes)
# Date: datetime64[ns]
# Code: object(字符串)
# Open, Close: float64
# Volume: object(原始列)
# Volume_Num: float64
# PE: float64
print()

print('=== 清洗后数据 ===')
print(df_clean.head())
print()

print('=== 数据质量报告 ===')
print(f'总行数: {len(df_clean)}')
# df_clean.dropna():删除包含任何NaN的行
print(f'完整行数: {df_clean.dropna().shape[0]}')
print('\n各列缺失值统计:')
print(df_clean.isnull().sum())

代码深度解析:

  1. 清洗函数设计:
    • clean_volume()函数处理多种单位
    • 使用.apply()逐元素处理
    • 返回数值类型,便于后续计算
  2. 清洗顺序:
    • 先处理日期(建立时间轴)
    • 再处理数值(便于计算)
    • 最后处理分类变量(股票代码等)
  3. 错误处理:
    • errors='coerce': 无法转换转为NaN
    • 便于后续识别和处理错误记录

17.3 逻辑错误检测与处理

17.3.1 价格异常检测

列表 17.5
# =============================================================================
# 题目:检测和处理价格异常值
# =============================================================================
# 本任务演示如何基于业务规则检测价格数据的异常值
# 场景:检测股票价格中的不可能值和极端值

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库
import numpy as np  # NumPy数值计算库

# ==================== 创建含价格异常的数据 ====================
# 设置随机种子
np.random.seed(42)
# 创建日期序列
dates = pd.date_range('2024-01-01', periods=20)
# 生成正常价格序列(随机游走)
normal_prices = 100 + np.random.randn(20).cumsum() * 0.5

# ==================== 人为添加异常值 ====================
prices = normal_prices.copy()
# 异常1:负价格(不可能,索引5)
prices[5] = -50
# 异常2:异常高价(索引10)
prices[10] = 5000
# 异常3:接近零的价格(索引15)
prices[15] = 0.01

# ==================== 创建数据框 ====================
df_prices = pd.DataFrame({
    'Date': dates,
    'Price': prices,
    # 生成随机成交量
    'Volume': np.random.randint(10000, 100000, 20)
})

# ==================== 显示数据 ====================
print('股票价格数据(含异常):')
print(df_prices)
print()

# ==================== 异常检测规则 ====================
# 规则1:价格不能为负或零
# df_prices['Price'] <= 0:返回布尔序列,True表示异常
rule1 = df_prices['Price'] <= 0

# 规则2:价格不能超过前一日价格的300%(涨停限制)
# .shift(1):将序列向下移动1位(获取前一日价格)
df_prices['Prev_Price'] = df_prices['Price'].shift(1)
# 价格 > 前一日价格 * 3:涨幅超过200%,可能是错误
rule2 = df_prices['Price'] > df_prices['Prev_Price'] * 3

# 规则3:价格不能低于前一日价格的30%(跌停限制)
# 价格 < 前一日价格 * 0.3:跌幅超过70%,可能是错误
rule3 = df_prices['Price'] < df_prices['Prev_Price'] * 0.3

# ==================== 合并所有规则 ====================
# |:按位或运算,满足任一规则即为异常
anomaly_mask = rule1 | rule2 | rule3

# ==================== 输出异常检测结果 ====================
print('异常检测结果:')
print(f'异常记录数: {anomaly_mask.sum()}')
print('\n异常记录详情:')
# df_prices[anomaly_mask]:选择异常记录
anomaly_records = df_prices[anomaly_mask].copy()
# 添加原因列
anomaly_records['Reason'] = ''
# 标记每条异常记录的原因
anomaly_records.loc[rule1, 'Reason'] = '非正价格'
anomaly_records.loc[rule2, 'Reason'] = '异常涨幅'
anomaly_records.loc[rule3, 'Reason'] = '异常跌幅'
print(anomaly_records[['Date', 'Price', 'Reason']])
print()

# ==================== 处理策略 ====================
# 策略1:删除异常记录
# ~anomaly_mask:取反,选择非异常记录
df_clean_drop = df_prices[~anomaly_mask].copy()
print('策略1: 删除异常记录')
print(f'原始记录数: {len(df_prices)}')
print(f'清洗后记录数: {len(df_clean_drop)}')
print()

# 策略2:用前值填充
df_clean_ffill = df_prices.copy()
# .fillna(method='ffill'):用前一个有效值填充
df_clean_ffill.loc[anomaly_mask, 'Price'] = df_clean_ffill['Price'].fillna(method='ffill')
print('策略2: 前值填充')
print(df_clean_ffill.loc[anomaly_mask, ['Date', 'Price']])
print()

# 策略3:标记但不删除(保留用于人工审核)
df_marked = df_prices.copy()
# 添加标记列
df_marked['Is_Anomaly'] = anomaly_mask
df_marked['Anomaly_Reason'] = ''
df_marked.loc[rule1, 'Anomaly_Reason'] = '非正价格'
df_marked.loc[rule2, 'Anomaly_Reason'] = '异常涨幅'
df_marked.loc[rule3, 'Anomaly_Reason'] = '异常跌幅'
print('策略3: 标记异常')
print(df_marked[df_marked['Is_Anomaly']][['Date', 'Price', 'Anomaly_Reason']])

补充说明:异常检测的统计学方法

除了规则检测,还可以使用统计方法:

  1. Z-Score方法: \[ Z = \frac{x - \mu}{\sigma} \] 通常认为 \(|Z| > 3\) 为异常

  2. IQR方法:

    • 异常值 < \(Q1 - 1.5 \times IQR\)
    • 异常值 > \(Q3 + 1.5 \times IQR\)
  3. 孤立森林(Isolation Forest):

    • 机器学习方法
    • 适合高维数据

17.3.2 成交量异常检测

列表 17.6
# =============================================================================
# 题目:成交量异常检测
# =============================================================================
# 本任务演示如何检测成交量数据的异常
# 场景:检测异常高/低成交量,可能是数据错误或重大事件

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库
import numpy as np  # NumPy数值计算库

# ==================== 创建含成交量异常的数据 ====================
# 设置随机种子
np.random.seed(42)
# 创建50天的数据
n_days = 50
df_vol = pd.DataFrame({
    'Date': pd.date_range('2024-01-01', periods=n_days),
    # 价格序列(随机游走)
    'Price': 100 + np.random.randn(n_days).cumsum() * 0.5,
    # 成交量:随机生成100万到1000万之间
    'Volume': np.random.randint(1000000, 10000000, n_days),
    # 成交额:随机生成
    'Turnover': np.random.uniform(100000000, 1000000000, n_days)
})

# ==================== 添加异常成交量 ====================
# 异常1:异常高成交量(索引10)
df_vol.loc[10, 'Volume'] = 100000000
# 异常2:异常低成交量(索引25)
df_vol.loc[25, 'Volume'] = 100
# 异常3:零成交量(索引40)
df_vol.loc[40, 'Volume'] = 0

# ==================== 显示数据 ====================
print('成交量数据:')
print(df_vol[['Date', 'Volume']].head(15))
print()

# ==================== 异常检测 ====================
# 规则1:成交量为零或负
rule1 = df_vol['Volume'] <= 0

# 规则2:成交量超过历史均值的10倍
volume_mean = df_vol['Volume'].mean()
rule2 = df_vol['Volume'] > volume_mean * 10

# 规则3:成交量低于历史均值的1%
rule3 = df_vol['Volume'] < volume_mean * 0.01

# 合并规则
anomaly_vol = rule1 | rule2 | rule3

# ==================== 输出检测结果 ====================
print('成交量异常检测:')
print(f'历史均值: {volume_mean:,.0f}')
print(f'异常记录数: {anomaly_vol.sum()}')
print('\n异常记录:')
vol_anomalies = df_vol[anomaly_vol].copy()
vol_anomalies['Reason'] = ''
vol_anomalies.loc[rule1, 'Reason'] = '非正成交量'
vol_anomalies.loc[rule2, 'Reason'] = '异常高成交量'
vol_anomalies.loc[rule3, 'Reason'] = '异常低成交量'
print(vol_anomalies[['Date', 'Volume', 'Reason']])
print()

# ==================== 验证:成交额 = 价格 × 成交量 ====================
# 估计成交额 = 价格 × 成交量 × 100(假设每手100股)
df_vol['Est_Turnover'] = df_vol['Price'] * df_vol['Volume'] * 100
# 计算估计成交额与实际成交额的比率
df_vol['Turnover_Ratio'] = df_vol['Est_Turnover'] / df_vol['Turnover']

print('成交额验证(比例应在合理范围内):')
print(df_vol[['Date', 'Turnover', 'Est_Turnover', 'Turnover_Ratio']].head(10))

# 成交额比例异常(可能数据错误)
# 比例 > 2 或 < 0.5:估计值与实际值相差超过2倍
turnover_anomaly = (df_vol['Turnover_Ratio'] > 2) | (df_vol['Turnover_Ratio'] < 0.5)
print(f'\n成交额比例异常记录: {turnover_anomaly.sum()} 条')

金融应用逻辑:

  1. 成交量异常可能的原因:
    • 异常高成交量: 可能是数据错误,也可能是真实重大事件
    • 异常低成交量: 可能是停牌、半日交易
    • 零成交量: 几乎可以肯定是数据错误或停牌
  2. 交叉验证:
    • 成交量 × 价格 ≈ 成交额
    • 如果比例严重偏离,至少有一个数据错误
  3. 处理策略:
    • 零成交量: 设为NaN或删除
    • 异常高: 保留但标记,可能需要人工确认
    • 异常低: 检查是否交易日特殊

17.3.3 一致性检验

列表 17.7
# =============================================================================
# 题目:数据间一致性检验
# =============================================================================
# 本任务演示如何检测数据间的逻辑矛盾
# 场景:验证指数数据的一致性(开盘价、收盘价、收益率)

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库
import numpy as np  # NumPy数值计算库

# ==================== 创建指数数据 ====================
# 设置随机种子
np.random.seed(42)
# 10天的数据
n_days = 10

# 指数点位:3000点为基础,每天随机变化
index_values = [3000 + i * 10 + np.random.randn() * 5 for i in range(n_days)]

# 日收益率:从点位计算
returns = pd.Series(index_values).pct_change().tolist()

# ==================== 创建DataFrame(故意引入不一致) ====================
df_index = pd.DataFrame({
    'Date': pd.date_range('2024-01-01', periods=n_days),
    # 收盘点位
    'Index_Close': index_values,
    # 开盘点位:在收盘价附近随机波动
    'Index_Open': [v + np.random.uniform(-10, 10) for v in index_values],
    # 日收益率:从点位计算(后面会故意修改)
    'Daily_Return': returns,
    # 成交量
    'Volume': np.random.randint(100000000, 500000000, n_days)
})

# ==================== 人为添加不一致 ====================
# 不一致1:修改收盘价,但不更新收益率
df_index.loc[5, 'Index_Close'] = 3100
# 不一致2:设置错误的收益率
df_index.loc[3, 'Daily_Return'] = -0.5

# ==================== 显示数据 ====================
print('指数数据:')
print(df_index)
print()

# ==================== 一致性检验 ====================
# 检验1:收盘价应为正
consist1 = df_index['Index_Close'] <= 0

# 检验2:开盘价不应偏离收盘价太多(假设单日涨跌不超过20%)
# 开盘价 / 收盘价 > 1.2 或 < 0.8:开盘价比收盘价高/低20%以上
consist2 = (df_index['Index_Open'] / df_index['Index_Close'] > 1.2) | \
          (df_index['Index_Open'] / df_index['Index_Close'] < 0.8)

# 检验3:从收盘价计算的收益率应与报告的收益率接近
# 计算实际收益率:.pct_change()计算百分比变化
df_index['Calculated_Return'] = df_index['Index_Close'].pct_change()
# 允许小数点误差:绝对差值 > 0.01(1%)
consist3 = (df_index['Calculated_Return'] - df_index['Daily_Return']).abs() > 0.01

# 合并不一致检验
inconsistency_mask = consist1 | consist2 | consist3

# ==================== 输出检验结果 ====================
print('一致性检验结果:')
print(f'不一致记录数: {inconsistency_mask.sum()}')
print('\n不一致详情:')
inconsistent_records = df_index[inconsistency_mask].copy()
# 添加问题描述列,初始化为空字符串
inconsistent_records['Issues'] = ''
if consist1.any():
    inconsistent_records.loc[consist1, 'Issues'] = '收盘价非正;'
if consist2.any():
    inconsistent_records.loc[consist2, 'Issues'] = '开盘价异常;'
if consist3.any():
    inconsistent_records.loc[consist3, 'Issues'] = '收益率不匹配;'

print(inconsistent_records[['Date', 'Index_Close', 'Daily_Return', 'Calculated_Return', 'Issues']])

理论背景:金融数据一致性约束

金融数据存在内在的逻辑约束:

  1. 价格约束:
    • \(Price_t > 0\): 价格必须为正
    • \(Open_t \approx Close_{t-1}\): 当日开盘价接近前日收盘
    • \(High_t \geq \max(Open_t, Close_t)\): 最高价不低于开盘价和收盘价
    • \(Low_t \leq \min(Open_t, Close_t)\): 最低价不高于开盘价和收盘价
  2. 收益率约束:
    • \(Return_t = (Price_t - Price_{t-1}) / Price_{t-1}\): 收益率定义
    • \(|Return_t| \leq Limit\): 涨跌幅限制
  3. 成交量约束:
    • \(Volume_t \geq 0\): 成交量非负
    • \(Turnover_t \approx Price_t \times Volume_t\): 成交额一致性
  4. 市值约束:
    • \(MarketCap_t = Price_t \times SharesOutstanding\): 市值计算
    • \(PE_t = Price_t / EPS_t\): 市盈率计算

17.4 实战案例股指数据清洗

17.4.1 数据加载与初始诊断

列表 17.8
# =============================================================================
# 题目:股指数据加载与质量评估
# =============================================================================
# 本任务演示真实金融数据的加载和质量评估
# 场景:沪深300指数的历史数据,含多种错误

# ==================== 导入必要的库 ====================
import pandas as pd  # Pandas数据分析库
import numpy as np  # NumPy数值计算库
import matplotlib.pyplot as plt  # Matplotlib绘图库

# ==================== 设置中文字体 ====================
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# ==================== 模拟股指数据(含多种错误) ====================
# 设置随机种子
np.random.seed(42)
# 50天的数据
n = 50
dates = pd.date_range('2024-01-01', periods=n)

# 生成正常的指数点位(3000点为基础)
index_values = 3000 + np.arange(n) * 5 + np.random.randn(n) * 20

# ==================== 创建数据框 ====================
df_index = pd.DataFrame({
    'Date': dates,
    # 开盘价:在基础值附近随机波动
    'Open': index_values + np.random.uniform(-5, 5, n),
    # 最高价:基础值 + 正向随机
    'High': index_values + np.random.uniform(0, 10, n),
    # 最低价:基础值 - 负向随机
    'Low': index_values - np.random.uniform(0, 10, n),
    # 收盘价:基础值
    'Close': index_values,
    # 成交量:随机生成(1亿~5亿)
    'Volume': np.random.randint(100000000, 500000000, n),
    # 成交额:随机生成(1000亿~5000亿,需指定int64避免Windows 32位溢出)
    'Amount': np.random.randint(100000000000, 500000000000, n, dtype=np.int64)
})

# ==================== 人为添加错误 ====================
# 格式错误1:千分位逗号
df_index.loc[5, 'Close'] = '3,250.5'
# 格式错误2:含单位
df_index.loc[10, 'Volume'] = '200000000股'

# 逻辑错误1:负价格
df_index.loc[15, 'Close'] = -3000
# 逻辑错误2:最高价 < 最低价
df_index.loc[20, 'High'] = df_index.loc[20, 'Low'] - 100
# 逻辑错误3:最低价 > 收盘价
df_index.loc[25, 'Low'] = 3500

# 异常值1:异常高成交量
df_index.loc[30, 'Volume'] = 10000000000
# 异常值2:异常高指数点位(单日暴涨40%)
df_index.loc[35, 'Close'] = 5000

# ==================== 显示原始数据 ====================
print('=== 原始数据预览 ===')
print(df_index.head(15))
print('\n数据类型:')
print(df_index.dtypes)
print('\n基本统计:')
print(df_index.describe())

17.4.2 格式错误修复

列表 17.9
# =============================================================================
# 题目:修复股指数据的格式错误
# =============================================================================
# 本任务演示如何修复格式错误,使数据可用于分析

# ==================== 复制数据 ====================
df_fixed = df_index.copy()

print('=== 格式错误修复 ===\n')

# ==================== 修复1:移除千分位 ====================
# 检查Close列是否为字符串类型
if df_fixed['Close'].dtype == 'object':
    # .astype(str):确保是字符串
    # .str.replace(',', ''):移除逗号
    df_fixed['Close'] = df_fixed['Close'].astype(str).str.replace(',', '')
    print('步骤1: 移除价格列的千分位逗号')

# ==================== 修复2:转换为数值 ====================
numeric_cols = ['Open', 'High', 'Low', 'Close']
for col in numeric_cols:
    # 如果列是字符串类型,需要转换
    if df_fixed[col].dtype == 'object':
        # pd.to_numeric(..., errors='coerce'):转换为数值,无法转换的变为NaN
        df_fixed[col] = pd.to_numeric(df_fixed[col], errors='coerce')
        print(f'步骤2: 将{col}转换为数值类型')

# ==================== 修复3:成交量清洗 ====================
if df_fixed['Volume'].dtype == 'object':
    # 定义清洗函数
    def clean_vol(vol):
        # 检查是否为NaN
        if pd.isna(vol):
            return np.nan
        # 移除单位符号
        vol_str = str(vol).replace('股', '').replace(',', '').replace('万', '0000')
        try:
            # 转换为浮点数
            return float(vol_str)
        except:
            # 转换失败,返回NaN
            return np.nan

    # .apply(clean_vol):对每个元素应用清洗函数
    df_fixed['Volume'] = df_fixed['Volume'].apply(clean_vol)
    print('步骤3: 清洗成交量格式')

# ==================== 转换后验证 ====================
print(f'\n格式修复后数据类型:')
print(df_fixed.dtypes)

17.4.3 逻辑错误检测与修复

列表 17.10
# =============================================================================
# 题目:检测和修复股指数据逻辑错误
# =============================================================================
# 本任务演示如何检测和修复数据的逻辑错误

# ==================== 初始化错误列表 ====================
print('=== 逻辑错误检测与修复 ===\n')
errors = []

# ==================== 规则1:价格必须为正 ====================
error_price_negative = df_fixed['Close'] <= 0
if error_price_negative.any():
    # 记录错误
    errors.append(('负价格', error_price_negative.sum()))
    # 修复:用前值填充
    # 将负价格设为NaN
    df_fixed.loc[error_price_negative, 'Close'] = np.nan
    # .fillna(method='ffill'):用前一个有效值填充
    df_fixed['Close'].fillna(method='ffill', inplace=True)
    print(f'规则1: 修复{error_price_negative.sum()}条负价格记录(前值填充)')

# ==================== 规则2:High >= Close, Low <= Close ====================
# 最高价不应低于收盘价,最低价不应高于收盘价
error_high_low = (df_fixed['High'] < df_fixed['Close']) | (df_fixed['Low'] > df_fixed['Close'])
if error_high_low.any():
    errors.append(('高低价错误', error_high_low.sum()))
    # 修复:重新计算High和Low
    # 如果High < Close,将High设为Close
    df_fixed.loc[df_fixed['High'] < df_fixed['Close'], 'High'] = df_fixed['Close']
    # 如果Low > Close,将Low设为Close
    df_fixed.loc[df_fixed['Low'] > df_fixed['Close'], 'Low'] = df_fixed['Close']
    print(f'规则2: 修复{error_high_low.sum()}条高低价错误记录')

# ==================== 规则3:单日涨跌幅不超过15% ====================
# 计算前一日收盘价
df_fixed['Prev_Close'] = df_fixed['Close'].shift(1)
# 计算日收益率:(收盘价 - 前收盘价) / 前收盘价
df_fixed['Daily_Change'] = (df_fixed['Close'] - df_fixed['Prev_Close']) / df_fixed['Prev_Close']
# 检测异常涨跌:|收益率| > 15%
error_excess_change = df_fixed['Daily_Change'].abs() > 0.15
if error_excess_change.any():
    errors.append(('异常涨跌', error_excess_change.sum()))
    print(f'规则3: 发现{error_excess_change.sum()}条异常涨跌记录')
    print('  建议人工审核以下日期:')
    # 显示异常记录的日期和价格
    print(df_fixed[error_excess_change][['Date', 'Close', 'Prev_Close', 'Daily_Change']])

# ==================== 规则4:成交量异常 ====================
volume_median = df_fixed['Volume'].median()
# 成交量 > 中位数 * 10 或 < 中位数 * 0.01
error_volume = (df_fixed['Volume'] > volume_median * 10) | (df_fixed['Volume'] < volume_median * 0.01)
if error_volume.any():
    errors.append(('异常成交量', error_volume.sum()))
    print(f'规则4: 发现{error_volume.sum()}条异常成交量记录')

# ==================== 错误汇总 ====================
print(f'\n=== 错误汇总 ===')
for error_type, count in errors:
    print(f'{error_type}: {count}条')

# ==================== 清洗后统计 ====================
print(f'\n=== 清洗后数据统计 ===')
print(df_fixed[['Open', 'High', 'Low', 'Close', 'Volume']].describe())

17.4.4 数据质量可视化

列表 17.11
# ==================== 创建对比图 ====================
# 创建2行2列的子图布局
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# ==================== 子图1:指数走势 ====================
axes[0, 0].plot(df_fixed['Date'], df_fixed['Close'], 'b-', linewidth=2)
axes[0, 0].set_title('指数收盘价走势', fontsize=12)
axes[0, 0].set_ylabel('点位', fontsize=10)
axes[0, 0].grid(True, alpha=0.3)
# 标记异常涨跌
if error_excess_change.any():
    anomalous_dates = df_fixed[error_excess_change]['Date']
    anomalous_prices = df_fixed[error_excess_change]['Close']
    axes[0, 0].scatter(anomalous_dates, anomalous_prices, color='red', s=50, zorder=5, label='异常涨跌')
    axes[0, 0].legend()

# ==================== 子图2:日收益率分布 ====================
returns = df_fixed['Daily_Change'].dropna()
axes[0, 1].hist(returns, bins=30, color='steelblue', edgecolor='black')
axes[0, 1].axvline(returns.mean(), color='red', linestyle='--', label=f'均值: {returns.mean():.4f}')
axes[0, 1].axvline(returns.median(), color='green', linestyle='--', label=f'中位数: {returns.median():.4f}')
axes[0, 1].set_title('日收益率分布', fontsize=12)
axes[0, 1].set_xlabel('收益率', fontsize=10)
axes[0, 1].set_ylabel('频数', fontsize=10)
axes[0, 1].legend(fontsize=9)
axes[0, 1].grid(True, alpha=0.3)

# ==================== 子图3:成交量 ====================
axes[1, 0].bar(range(len(df_fixed)), df_fixed['Volume'], color='coral', edgecolor='black')
axes[1, 0].set_title('成交量', fontsize=12)
axes[1, 0].set_xlabel('交易序号', fontsize=10)
axes[1, 0].set_ylabel('成交量', fontsize=10)
axes[1, 0].grid(True, alpha=0.3, axis='y')

# ==================== 子图4:价格波动(振幅) ====================
# 振幅 = (最高价 - 最低价) / 最低价 * 100%
df_fixed['Amplitude'] = (df_fixed['High'] - df_fixed['Low']) / df_fixed['Low'] * 100
axes[1, 1].plot(df_fixed['Date'], df_fixed['Amplitude'], 'g-', linewidth=1.5)
axes[1, 1].axhline(df_fixed['Amplitude'].mean(), color='red', linestyle='--', label='平均振幅')
axes[1, 1].set_title('日内振幅(%)', fontsize=12)
axes[1, 1].set_ylabel('振幅(%)', fontsize=10)
axes[1, 1].legend(fontsize=9)
axes[1, 1].grid(True, alpha=0.3)

# ==================== 调整布局并显示 ====================
plt.tight_layout()
plt.show()

# ==================== 数据质量指标 ====================
print('=== 数据质量指标 ===')
print(f'1. 完整性: 缺失值比例={df_fixed.isnull().sum().sum() / df_fixed.size:.2%}')
print(f'2. 一致性: 逻辑错误已修复')
print(f'3. 准确性: {len(errors)}类错误已处理')
print(f'4. 有效记录: {len(df_fixed)}/{len(df_index)} ({len(df_fixed)/len(df_index):.1%})')